perf(unbundle): skip re-emit for already-indexed children (PE-9098)#740
perf(unbundle): skip re-emit for already-indexed children (PE-9098)#740arielmelendez wants to merge 2 commits into
Conversation
When the bundle repair worker re-queues a bundle (because some of its children were dropped at the data-item indexer's queue cap on a prior attempt), the parser previously re-emitted EVERY child — including the 90%+ that were already indexed. Each redundant emit consumed a slot in the dataItemIndexer / ans104DataIndexer queues, contending with the genuinely-new children for the cap. Under sustained backfill load this was the dominant contributor to the 2M+ items dropped per hour observed on the Turbo gw2 indexer. The fix: before parsing, the Ans104Unbundler queries the BundleIndex for the set of ids already present in new_data_items or stable_data_items under this parent. The set is passed to the parser worker. For each parsed item whose id is in the set, the parser increments matchedItemCount (preserving the bundle's matched_data_item_count invariant — see below) and skips the filter eval, hash, and DATA_ITEM_MATCHED emit. Net effect: redundant queue traffic disappears and the downstream queues breathe. matched_data_item_count invariant: upsertBundle uses `IFNULL(@matched, matched)` to update the column, and the fully-indexed predicate is strict equality between bundle_data_items row count and matched_data_item_count. If we lowered the emitted matched count by the skip amount, the new lower value would replace the existing one and the equality would never hold (rows > matched_data_item_count from then on, so fully_indexed_at never sets). Counting skips toward matchedItemCount avoids this. Assumption: the unbundle/index filter has not changed between parses — when filters do change, the filter-reprocess path clears the bundle separately and skipChildIds is empty for those. Failure handling: if getIndexedChildIds throws, we log and proceed without a skip-set rather than failing the unbundle. Worst case is the pre-fix behavior of redundant emits, not a regression. Five-edit DB plumbing for `selectIndexedChildIds` + `getIndexedChildIds`: SQL, worker impl, queue wrapper, message handler, BundleIndex interface. Tests cover three cases: - no bundleIndex provided -> skipChildIds undefined (unchanged behavior) - bundleIndex returns ids -> skipChildIds passed to parser - bundleIndex throws -> skipChildIds undefined, unbundle still proceeds Companion to PE-9098-bundle-repair-bdi-routing (the previous PR); together they address the BDI repair pipeline end-to-end.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## PE-9098-bundle-repair-bdi-routing #740 +/- ##
=====================================================================
- Coverage 78.20% 78.14% -0.07%
=====================================================================
Files 121 121
Lines 42799 42908 +109
Branches 3225 3230 +5
=====================================================================
+ Hits 33471 33530 +59
- Misses 9286 9336 +50
Partials 42 42 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Two refinements to the parser-skip-already-indexed work, addressing feedback from review: 1. **Tighten the predicate to `data_offset IS NOT NULL`** on both `new_data_items` and `stable_data_items`. The optimistic insert path (insertOrIgnoreNewDataItem, used by the admin `/queue-data-item` route) intentionally leaves the eleven root-atom columns NULL — including `data_offset` — expecting a subsequent full parser-emit to fill them via the COALESCE/IFNULL clauses in upsertNewDataItem's `ON CONFLICT UPDATE`. Without this predicate, our skip-set would include optimistically-inserted rows and strand their tuple fields permanently NULL across re-parses. With it, we only skip rows whose tuple fields are already populated — the case where re-emit is a true no-op. Cost: the existing covering indexes `(parent_id, id)` no longer cover the query (data_offset isn't in the index). Each row pays one extra page read. For a 10K-child bundle, ~10K extra heap reads. Still fast (sub-100ms worst case). If this becomes hot we can add a partial index `(parent_id, id) WHERE data_offset IS NOT NULL`. 2. **Add `bundles_data_items_skipped_already_indexed_total`** counter. Wired from the `ANS104_UNBUNDLE_COMPLETE` event handler off the `alreadyIndexedSkippedCount` value the parser already includes in its completion message. No label — the only realistic value would be `bundle_format` and ANS-104 is the only format in production use; an unused dimension on a Prometheus metric is just clutter. Tests unchanged (12 pass). Build + lint clean.
|
Might not ship this after all. |
|
Hey @arielmelendez — we're prepping a release and noticed your comment "might not ship this after all." Should we close PR #740, or do you want to revisit? Just want to align before cutting the next release. cc @vilenarios |
@vilenarios this hasn't been integrated or adequately tested yet and it's not clear to me that it's yet necessary for optimal system performance in the real-life scenarios we're encountering. Let's let this one hang back a bit longer while we continue fine tuning the system in other areas a bit more. |
Summary
When the bundle repair worker re-queues a bundle whose previous parse had some children dropped at the data-item indexer's queue cap, the parser used to re-emit every child — including the 90%+ already indexed. Each redundant emit consumed a slot in the
dataItemIndexer/ans104DataIndexerqueues, contending with the genuinely-new children for the cap. Under sustained backfill load this is the dominant contributor to the 2M+ items dropped per hour we observed on the Turbo gw2 indexer after PE-9098 landed.This PR adds a skip-set: before parsing, the
Ans104UnbundlerqueriesBundleIndex.getIndexedChildIds(parentId)for ids already innew_data_items∪stable_data_itemsunder this parent. The set is passed through to the parser worker. For each parsed child whose id is in the set, the parser counts it towardmatchedItemCount(preserving the bundle'smatched_data_item_countinvariant — see below) and skips the filter eval, hash, andDATA_ITEM_MATCHEDemit.Why count skipped items toward
matchedItemCountupsertBundleupdatesmatched_data_item_countwithIFNULL(@matched, matched)— passing a value replaces it. The fully-indexed predicate is strict equality betweenbundle_data_itemsrow count andmatched_data_item_count. If we lowered the emitted matched count by the skip amount, the new lower value would replace the existing one, and the equality would never hold afterwards (rows > matched, sofully_indexed_atnever sets and the bundle stays in the repair pool forever). Counting skips towardmatchedItemCountavoids that.Assumption: the unbundle/index filter has not changed between parses. When filters do change, the filter-reprocess path clears the bundle's state separately and
skipChildIdswill be empty for those bundles — so this assumption is enforced at the workflow layer, not implicit.Failure handling
If
getIndexedChildIdsthrows (DB lock, breaker open, etc.) we log and proceed without a skip-set. Worst case is the pre-fix behavior of redundant emits — not a regression, not a hard failure.Test plan
yarn buildcleanyarn lint:checkclean on changed filesyarn test:file src/workers/ans104-unbundler.test.ts src/workers/bundle-repair-worker.test.ts— 12 tests passdata_items_dropped_total{queue_name="dataItemIndexer"}and{queue_name="ans104DataIndexer"}rates to drop dramatically (skip rate on repair-driven re-parses should also fall as queue pressure decreases). NewalreadyIndexedSkippedCountis included in theUNBUNDLE_COMPLETEmessage; surfacing it as a metric is a follow-up.Notes
PE-9098-bundle-repair-bdi-routing(the previous PR). Once that lands, this PR's diff will be just the skip-set change.selectIndexedChildIds+getIndexedChildIds: SQL, worker impl, queue wrapper, message handler,BundleIndexinterface.alreadyIndexedSkippedCountvalue flows out of the parser worker throughUNBUNDLE_COMPLETEbut isn't yet wired to a Prometheus counter. A small follow-up should add one so we can verify the optimization from Grafana.🤖 Generated with Claude Code